Created
May 13, 2023 02:23
-
-
Save diegofps/87945a0c3e800c747f3af07833ff6b7e to your computer and use it in GitHub Desktop.
A simple Python gateway implemention
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from threading import Thread, Lock | |
from queue import Queue | |
import logging as log | |
import traceback | |
import random | |
import time | |
import sys | |
log.basicConfig(format='%(name)s - %(levelname)s - %(message)s', level=log.DEBUG) | |
class MqttReader(IotReader): | |
def __init__(self, queue_master): | |
super().__init__(queue_master, 'MqttReader') | |
class MqttWriter(IotWriter): | |
def __init__(self): | |
super().__init__('MqttWriter') | |
class AnySerialReader: | |
def __init__(self, name="AnySerialReader"): | |
self.name = name | |
self.next_event = time.time() + random.random() * 5 | |
def read(self, timeout=None): | |
now = time.time() | |
if now < self.next_event: | |
return None | |
self.next_event = time.time() + random.random() * 5 | |
data = random.choice(['event_a', 'event_b', 'event_c']) | |
log.debug('') | |
log.debug(f'{self.name} => {data}') | |
return data | |
def terminate(self): | |
log.warning(f"Terminating {self.name}") | |
class AnySerialWriter: | |
def __init__(self, name="AnySerialWriter"): | |
self.name = name | |
def write(self, data): | |
log.debug(f'{self.name} <= {data}') | |
def terminate(self): | |
log.warning(f"Terminating {self.name}") | |
class IotReader(Thread): | |
def __init__(self, queue_master, name='IotReader'): | |
super().__init__() | |
self.queue_master = queue_master | |
self.queue = Queue() | |
self.done = False | |
self.name = name | |
self.start() | |
def terminate(self): | |
self.done = True | |
def run(self): | |
log.info(f"Starting thread for {self.name}") | |
serial_reader = AnySerialReader('Serial' + self.name) | |
log.info(f"Serial reader for {self.name} initialized") | |
while not self.done: | |
try: | |
data = serial_reader.read(timeout=1) | |
if data is None: | |
continue | |
self.queue_master.put(('on_iot_event', data)) | |
except: | |
traceback.print_exc(file=sys.stdout) | |
log.warning("Terminating IotReader") | |
serial_reader.terminate() | |
class IotWriter(Thread): | |
def __init__(self, name='IotWriter'): | |
super().__init__() | |
self.queue = Queue() | |
self.done = False | |
self.name = name | |
self.start() | |
def terminate(self): | |
self.done = True | |
self.queue.put( ('terminate', None) ) | |
def send(self, data): | |
self.queue.put( ('write_message', data) ) | |
def run(self): | |
log.info(f"Starting thread for {self.name}") | |
serial_writer = AnySerialWriter('Serial' + self.name) | |
log.info(f"Serial writer for {self.name} initialized") | |
while not self.done: | |
try: | |
action, data = self.queue.get() | |
if action == 'terminate': | |
break | |
elif action == 'write_message': | |
serial_writer.write(data) | |
else: | |
log.error(f'Unknown action for IotWriter - action={action}, data={data}') | |
except: | |
traceback.print_exc(file=sys.stdout) | |
log.warning("Terminating IotWriter") | |
serial_writer.terminate() | |
class Gateway(Thread): | |
def __init__(self): | |
super().__init__() | |
self.queue_master = Queue() | |
self.done = False | |
self.start() | |
def run(self): | |
log.info("Starting Gateway") | |
while not self.done: | |
try: | |
self.iot_reader = IotReader(self.queue_master) | |
self.iot_writer = IotWriter() | |
self.mqtt_reader = MqttReader(self.queue_master) | |
self.mqtt_writer = MqttWriter() | |
log.info(f"Starting {self.__class__.__name__}") | |
while not self.done: | |
try: | |
action, data = self.queue_master.get() | |
if action in 'on_mqtt_event': | |
self.on_mqtt_event(data) | |
elif action == 'on_iot_event': | |
self.on_iot_event(data) | |
elif action == 'terminate': | |
break | |
else: | |
log.error(f'Unknown action, action={action}, data={data}') | |
except: | |
log.error("Error during message parsing") | |
traceback.print_exc(file=sys.stdout) | |
except: | |
log.error("Error during gateway configuration") | |
traceback.print_exc(file=sys.stdout) | |
self.iot_reader.terminate() | |
self.iot_writer.terminate() | |
self.mqtt_reader.terminate() | |
self.mqtt_writer.terminate() | |
self.iot_reader.join() | |
self.iot_writer.join() | |
self.mqtt_reader.join() | |
self.mqtt_writer.join() | |
log.warning('Terminating Gateway') | |
def terminate(self): | |
self.done = True | |
self.queue_master.put(('terminate', None)) | |
def on_iot_event(self, data): | |
log.info(f'Event from iot device, forwarding to mqtt, data={data}') | |
self.mqtt_writer.send(data) | |
def on_mqtt_event(self, data): | |
log.info(f'Event from iot device, forwarding to iot, data={data}') | |
self.iot_writer.send(data) | |
gateway = Gateway() | |
# Your main thread is free here, you could start a webserver and display | |
# a dashboard. Or just wait, like below. | |
try: | |
gateway.join() | |
except KeyboardInterrupt: | |
log.info("Sending terminate command...") | |
gateway.terminate() | |
try: | |
gateway.join() | |
except KeyboardInterrupt: | |
log.info("Killing the app...") | |
sys.exit(0) | |
pass | |
log.info("Bye!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment